package drds.data_propagate.parse.multistage_coordinator;

import com.lmax.disruptor.LifecycleAware;
import com.lmax.disruptor.WorkHandler;
import drds.data_propagate.binlog_event.protogenesis.BinlogEventType;
import drds.data_propagate.binlog_event.protogenesis.event.row_based_replication_events.RowsEvent;
import drds.data_propagate.binlog_event.secondary.Entry;
import drds.data_propagate.parse.MessageEvent;
import drds.data_propagate.parse.exception.ParseException;

public class DmlParseStageHandler implements WorkHandler<MessageEvent>, LifecycleAware {

    private MultistageCoordinatorImpl multiStageCoprocessor;

    public DmlParseStageHandler(MultistageCoordinatorImpl multiStageCoprocessor) {
        this.multiStageCoprocessor = multiStageCoprocessor;
    }

    @Override
    public void onEvent(MessageEvent messageEvent) throws Exception {
        try {
            if (messageEvent.isNeedDmlParse()) {
                int eventType = messageEvent.getBinLogEvent().getHeader().getEventType();
                Entry entry = null;
                switch (eventType) {
                    case BinlogEventType.binlog_event_rows_query_log_event:
                        entry = multiStageCoprocessor.binlogEventConvertToEntry.parse(messageEvent.getBinLogEvent(), false);
                        break;
                    default:
                        // 单独解析dml事件
                        entry = multiStageCoprocessor.binlogEventConvertToEntry.parseRowsEvent(messageEvent.getTableMetaData(), (RowsEvent) messageEvent.getBinLogEvent());
                }

                messageEvent.setEntry(entry);
            }
        } catch (Throwable e) {
            multiStageCoprocessor.exception = new ParseException(e);
            throw multiStageCoprocessor.exception;
        }
    }

    @Override
    public void onStart() {

    }

    @Override
    public void onShutdown() {

    }
}
